Kafka 偏移量

Kafka 0.9 之前版本

这里的偏移量是指 kafka consumer offset,在 Kafka 0.9 版本之前消费者偏移量默认被保存在 zookeeper 中(/consumers/<group.id>/offsets//),因此在初始化消费者的时候需要指定 zookeeper.hosts。

Kafka 0.9 之后版本

随着 Kafka consumer 在实际场景的不断应用,社区发现旧版本 consumer 把位移提交到 ZooKeeper 的做法并不合适。ZooKeeper 本质上只是一个协调服务组件,它并不适合作为位移信息的存储组件,毕竟频繁高并发的读/写操作并不是 ZooKeeper 擅长的事情。因此在 0.9 版本开始 consumer 将位移提交到 Kafka 的一个内部 topic(__consumer_offsets)中,该主题默认有 50 个分区,每个分区 3 个副本。

消息处理语义

  • at-most-once:最多一次,消息可能丢失,但不会被重复处理;
  • at-least-once:至少一次,消息不会丢失,但可能被处理多次;
  • exactly-once:精确一次,消息一定会被处理且只会被处理一次。
  • 若 consumer 在消息消费之前就提交位移,那么便可以实现 at-most-once,因为若 consumer 在提交位移与消息消费之间崩溃,则 consumer 重启后会从新的 offset 位置开始消费,前面的那条消息就丢失了;相反地,
  • 若提交位移在消息消费之后,则可实现 at-least-once 语义。由于 Kafka 没有办法保证消息处理成功与位移提交在同一个事务中完成,若消息消费成功了,也提交位移了,但是处理失败了,因此 Kafka 默认提供的就是 at-least-once 的处理语义。

kafka offset 提交方式

  • 默认情况下,consumer 是自动提交位移的,自动提交间隔是 5 秒,可以通过设置 auto.commit.interval.ms 参数可以控制自动提交的间隔。

    自动位移提交的优势是降低了用户的开发成本使得用户不必亲自处理位移提交;劣势是用户不能细粒度地处理位移的提交,特别是在有较强的精确一次处理语义时(在这种情况下,用户可以使用手动位移提交)。

  • 手动位移提交就是用户自行确定消息何时被真正处理完并可以提交位移,用户可以确保只有消息被真正处理完成后再提交位移。如果使用自动位移提交则无法保证这种时序性,因此在这种情况下必须使用手动提交位移。

    设置使用手动提交位移非常简单,仅仅需要在构建 KafkaConsumer 时设置 enable.auto.commit=false,然后调用 commitSync 或 commitAsync 方法即可。

Spark 位移处理方式

auto.offset.reset设置思路

  • 对于 auto.offset.reset 个人推荐设置为 earliest,初次运行的时候,由于 __consumer_offsets 没有相关偏移量信息,因此消息会从最开始的地方读取;当第二次运行时,由于 __consumer_offsets 已经存在消费的 offset 信息,因此会根据 __consumer_offsets 中记录的偏移信息继续读取数据。

  • 此外,对于使用 zookeeper 管理偏移量而言,只需要删除对应的节点,数据即可从头读取,也是非常方便。不过如果你希望从最新的地方读取数据,不需要读取旧消息,则可以设置为 latest。

    1
    2
    3
    earilist:提交过分区,从Offset处读取,如果没有提交过offset,从头读取
    latest:提交过分区,从Offset处读取,没有从最新的数据开始读取
    None:如果没有提交offset,就会报错,提交过offset,就从offset处读取

订阅 Kafka 主题

  • 基于正则订阅主题,有以下好处:

    1
    2
    3
    4
    5
    6
    无需罗列主题名,一两个主题还好,如果有几十个,罗列过于麻烦了;
    可实现动态订阅的效果(新增的符合正则的主题也会被读取)。

    stream = KafkaUtils.createDirectStream[String, String](ssc,
    LocationStrategies.PreferConsistent,
    ConsumerStrategies.SubscribePattern[String, String](Pattern.compile(topicStr), kafkaConf, customOffset))
  • LocationStrategies 分配分区策略,LocationStrategies:根据给定的主题和集群地址创建consumer

    1
    2
    3
    4
    创建DStream,返回接收到的输入数据
    LocationStrategies.PreferConsistent:持续的在所有Executor之间匀分配分区 (均匀分配,选中的每一个Executor都会分配 partition)
    LocationStrategies.PreferBrokers: 如果executorkafka brokers 在同一台机器上,选择该executor
    LocationStrategies.PreferFixed: 如果机器不是均匀的情况下,可以指定特殊的hosts。当然如果不指定,采用 LocationStrategies.PreferConsistent模式
  • SparkStreaming 序列化问题

    在 driver 中使用到的变量或者对象无需序列化,传递到 exector 中的变量或者对象需要序列化。因此推荐的做法是,在 exector 中最好只处理数据的转换,在 driver 中对处理的结果进行存储等操作。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
     stream.foreachRDD(rdd => {

    // driver 代码运行区域
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    kafkaOffset.updateOffset(offsetRanges)

    // exector 代码运行区域
    val resultRDD = rdd.map(xxxxxxxx)
    //endregion

    //对结果进行存储
    resultRDD.saveToES(xxxxxx)
    kafkaOffset.commitOffset(offsetRanges)
    })

使用老式zookeeper手动管理位移代码分析

  • Zookeeper 偏移量管理ZkKafkaOffset实现,借助 zookeeper 管理工具可以对任何一个节点的信息进行修改、删除,如果希望从最开始读取消息,则只需要删除 zk 某个节点的数据即可。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    import org.I0Itec.zkclient.ZkClient
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.kafka.common.TopicPartition
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.streaming.kafka010.OffsetRange

    import scala.collection.JavaConverters._

    class ZkKafkaOffset(getClient: () => ZkClient, getZkRoot : () => String) {

    // 定义为 lazy 实现了懒汉式的单例模式,解决了序列化问题,方便使用 broadcast
    lazy val zkClient: ZkClient = getClient()
    lazy val zkRoot: String = getZkRoot()

    // offsetId = md5(groupId+join(topics))
    // 初始化偏移量的 zk 存储路径 zkRoot
    def initOffset(offsetId: String) : Unit = {
    if(!zkClient.exists(zkRoot)){
    zkClient.createPersistent(zkRoot, true)
    }
    }

    // 从 zkRoot 读取偏移量信息
    def getOffset(): Map[TopicPartition, Long] = {
    val keys = zkClient.getChildren(zkRoot)
    var initOffsetMap: Map[TopicPartition, Long] = Map()
    if(!keys.isEmpty){
    for (k:String <- keys.asScala) {
    val ks = k.split("!")
    val value:Long = zkClient.readData(zkRoot + "/" + k)
    initOffsetMap += (new TopicPartition(ks(0), Integer.parseInt(ks(1))) -> value)
    }
    }
    initOffsetMap
    }

    // 根据单条消息,更新偏移量信息
    def updateOffset(consumeRecord: ConsumerRecord[String, String]): Boolean = {
    val path = zkRoot + "/" + consumeRecord.topic + "!" + consumeRecord.partition
    zkClient.writeData(path, consumeRecord.offset())
    true
    }

    // 消费消息前,批量更新偏移量信息
    def updateOffset(offsetRanges: Array[OffsetRange]): Boolean = {
    for (offset: OffsetRange <- offsetRanges) {
    val path = zkRoot + "/" + offset.topic + "!" + offset.partition
    if(!zkClient.exists(path)){
    zkClient.createPersistent(path, offset.fromOffset)
    }
    else{
    zkClient.writeData(path, offset.fromOffset)
    }
    }
    true
    }

    // 消费消息后,批量提交偏移量信息
    def commitOffset(offsetRanges: Array[OffsetRange]): Boolean = {
    for (offset: OffsetRange <- offsetRanges) {
    val path = zkRoot + "/" + offset.topic + "!" + offset.partition
    if(!zkClient.exists(path)){
    zkClient.createPersistent(path, offset.untilOffset)
    }
    else{
    zkClient.writeData(path, offset.untilOffset)
    }
    }
    true
    }

    def finalize(): Unit = {
    zkClient.close()
    }
    }

    object ZkKafkaOffset{
    def apply(cong: SparkConf, offsetId: String): ZkKafkaOffset = {
    val getClient = () =>{
    val zkHost = cong.get("kafka.zk.hosts", "127.0.0.1:2181")
    new ZkClient(zkHost, 30000)
    }
    val getZkRoot = () =>{
    val zkRoot = "/kafka/ss/offset/" + offsetId
    zkRoot
    }
    new ZkKafkaOffset(getClient, getZkRoot)
    }
    }
  • Spark Streaming 消费 Kafka 消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    第一步:val customOffset: Map[TopicPartition, Long] = kafkaOffset.getOffset(ssc)
    第二步:stream = KafkaUtils.createDirectStream[String, String](ssc,
    LocationStrategies.PreferConsistent,
    ConsumerStrategies.Subscribe[String, String](topics, kafkaConf, customOffset))
    第三步:处理后,kafkaOffset.commitOffset(offsetRanges)

    import scala.collection.JavaConverters._

    object RtDataLoader {
    def main(args: Array[String]): Unit = {
    // 从配置文件读取 kafka 配置信息
    val props = new Props("xxx.properties")
    val groupId = props.getStr("groupId", "")
    if(StrUtil.isBlank(groupId)){
    StaticLog.error("groupId is empty")
    return
    }
    val kfkServers = props.getStr("kfk_servers")
    if(StrUtil.isBlank(kfkServers)){
    StaticLog.error("bootstrap.servers is empty")
    return
    }
    val topicStr = props.getStr("topics")
    if(StrUtil.isBlank(kfkServers)){
    StaticLog.error("topics is empty")
    return
    }

    // KAFKA 配置设定
    val topics = topicStr.split(",")
    val kafkaConf = Map[String, Object](
    "bootstrap.servers" -> kfkServers,
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> groupId,
    "receive.buffer.bytes" -> (102400: java.lang.Integer),
    "max.partition.fetch.bytes" -> (5252880: java.lang.Integer),
    "auto.offset.reset" -> "earliest",
    "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val conf = new SparkConf().setAppName("ss-kafka").setIfMissing("spark.master", "local[2]")

    // streaming 相关配置
    conf.set("spark.streaming.stopGracefullyOnShutdown","true")
    conf.set("spark.streaming.backpressure.enabled","true")
    conf.set("spark.streaming.backpressure.initialRate","1000")

    // 设置 zookeeper 连接信息
    conf.set("kafka.zk.hosts", props.getStr("zk_hosts", "sky-01:2181"))

    // 创建 StreamingContext
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    val ssc = new StreamingContext(sc, Seconds(5))

    // 根据 groupId 和 topics 获取 offset
    val offsetId = SecureUtil.md5(groupId + topics.mkString(","))
    val kafkaOffset = ZkKafkaOffset(ssc.sparkContext.getConf, offsetId)
    kafkaOffset.initOffset(ssc, offsetId)
    val customOffset: Map[TopicPartition, Long] = kafkaOffset.getOffset(ssc)

    // 创建数据流
    var stream:InputDStream[ConsumerRecord[String, String]] = null
    if(topicStr.contains("*")) {
    StaticLog.warn("使用正则匹配读取 kafka 主题:" + topicStr)
    stream = KafkaUtils.createDirectStream[String, String](ssc,
    LocationStrategies.PreferConsistent,
    ConsumerStrategies.SubscribePattern[String, String](Pattern.compile(topicStr), kafkaConf, customOffset))
    }
    else {
    StaticLog.warn("待读取的 kafka 主题:" + topicStr)
    stream = KafkaUtils.createDirectStream[String, String](ssc,
    LocationStrategies.PreferConsistent,
    ConsumerStrategies.Subscribe[String, String](topics, kafkaConf, customOffset))
    }

    // 消费数据
    stream.foreachRDD(rdd => {
    // 消息消费前,更新 offset 信息
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    kafkaOffset.updateOffset(offsetRanges)

    //region 处理详情数据
    StaticLog.info("开始处理 RDD 数据!")
    //endregion

    // 消息消费结束,提交 offset 信息
    kafkaOffset.commitOffset(offsetRanges)
    })
    ssc.start()
    ssc.awaitTermination()
    }
    }
  • setStartFromGroupOffsets()【默认消费策略】默认读取上次保存的offset信息如果是应用第一次启动,读取不到上次的offset信息,则会根据这个参数auto.offset.reset的值来进行消费数据

  • setStartFromEarliest()从最早的数据开始进行消费,忽略存储的offset信息

  • setStartFromLatest()从最新的数据进行消费,忽略存储的offset信息

  • setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long>)从指定位置进行消费。

  • 当checkpoint机制开启的时候,KafkaConsumer会定期把kafka的offset信息还有其他operator的状态信息一块保存起来。当job失败重启的时候,Flink会从最近一次的checkpoint中进行恢复数据,重新消费kafka中的数据。

  • 为了能够使用支持容错的kafka Consumer,需要开启checkpointenv.enableCheckpointing(5000); // 每5s checkpoint一次

  • Kafka Consumers Offset 自动提交有以下两种方法来设置,可以根据job是否开启checkpoint来区分:

    (1) Flink Checkpoint关闭时: 可以通过Kafka下面两个Properties参数配置

    1
    2
    enable.auto.commit
    auto.commit.interval.ms

    (2) Checkpoint开启时:当执行checkpoint的时候才会保存offset,这样保证了kafka的offset和checkpoint的状态偏移量保持一致。可以通过这个参数设置

    1
    setCommitOffsetsOnCheckpoints(boolean)

    这个参数默认就是true。表示在checkpoint的时候提交offset, 此时,kafka中的自动提交机制就会被忽略。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    //获取Flink的运行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    //checkpoint配置
    env.enableCheckpointing(5000);
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
    env.getCheckpointConfig().setCheckpointTimeout(60000);
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

    //设置statebackend
    env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));

    String topic = "kafkaConsumer";
    Properties prop = new Properties();
    prop.setProperty("bootstrap.servers","SparkMaster:9092");
    prop.setProperty("group.id","kafkaConsumerGroup");

    FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), prop);

    myConsumer.setStartFromGroupOffsets();//默认消费策略
    myConsumer.setCommitOffsetsOnCheckpoints(true);
    DataStreamSource<String> text = env.addSource(myConsumer);

    text.print().setParallelism(1);

    env.execute("StreamingFromCollection");
  • Flink KafkaConsumer允许配置向 Kafka brokers(或者向Zookeeper)提交offset的行为。需要注意的是,Flink Kafka Consumer并不依赖于这些提交回Kafka或Zookeeper的offset来保证容错。这些被提交的offset只是意味着Flink将消费的状态暴露在外以便于监控。

  • FlinkKafkaConsumer提供了一套健壮的机制保证了在高吞吐量的情况下exactly-once的消费Kafka的数据,它的API的使用与配置也比较简单,同时也便于监控。

  • barrier可以理解为checkpoint之间的分隔符,在它之前的data属于前一个checkpoint,而在它之后的data属于另一个checkpoint。同时,barrier会由source(如FlinkKafkaConsumer)发起,并混在数据中,同数据一样传输给下一级的operator,直到sink为止。如果barrier已经被sink收到,那么说明checkpoint已经完成了(这个checkpoint的状态为completed并被存到了state backend中),它之前的数据已经被处理完毕并sink。

  • Flink异步记录checkpoint的行为是由我们的来配置的,只有当我们设置了enableCheckpointing()时,Flink才会在checkpoint完成时(整个job的所有的operator都收到了这个checkpoint的barrier才意味这checkpoint完成,具体参考我们对Flink checkpoint的介绍)将offset记录起来并提交,这时候才能够保证exactly-once。

  • Kafka Producer的容错-Kafka 0.9 and 0.10

    1
    2
    3
    4
    5
      如果Flink开启了checkpoint,针对FlinkKafkaProducer09和FlinkKafkaProducer010 可以提供 at-least-once的语义,还需要配置下面两个参数:
    setLogFailuresOnly(false)
    setFlushOnCheckpoint(true)
    注意:建议修改kafka 生产者的重试次数retries【这个参数的值默认是0
    复制代码
  • Kafka Producer的容错-Kafka 0.11,如果Flink开启了checkpoint,针对FlinkKafkaProducer011 就可以提供 exactly-once的语义,但是需要选择具体的语义

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    具体的语义设置方式 
    Semantic.NONE
    Semantic.AT_LEAST_ONCE【默认】
    Semantic.EXACTLY_ONCE

    checkpoint配置
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.enableCheckpointing(5000);
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
    env.getCheckpointConfig().setCheckpointTimeout(60000);
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

    //第一种解决方案,设置FlinkKafkaProducer011里面的事务超时时间
    //设置事务超时时间
    //prop.setProperty("transaction.timeout.ms",60000*15+"");

    //第二种解决方案,设置kafka的最大事务超时时间,主要是kafka的配置文件设置。
    //FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(brokerList, topic, new SimpleStringSchema());

    //使用仅一次语义的kafkaProducer
    FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(topic, new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()), prop, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);

    text.addSink(myProducer);